package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * 本地测试的 wordcount程序
 */
public class WordCountLocal {

    public static void main(String[] args) {

        // 编写spark应用程序步骤
        // 本地执行，是可以直接在main方法中执行的

        // 第一步：创建SparkConf对象, 设置Spark应用的配置信息
        // 使用setMaster()可以设置Spark应用程序要链接的spark集群的master节点的url，如果设置为local则代表在本地运行
        SparkConf conf = new SparkConf()
                .setAppName("WordCountLocal")
                .setMaster("local");

        // 第二步：创建JavaSparkContext对象
        // 在Spark中，SparkContext是Spark所有功能的一个入口，无论是用java、scala，甚至使用python编写
        // 都必须要有一个SparkContext，他的主要作用包括初始化Spark应用程序所需的一些核心组件，包括：调度器，还会去到SparkMaster节点上注册，等等
        // 但是在Spark中，编写不同类型的Spark应用程序，使用的SparkContext是不同的，如果使用scala，使用的是原生的SparkContext对象，
            //如果是使用Java， 那么使用JavaSparkContext对象
            //如果开发Spark SQL程序，那么就是SQLContext，HiveContext
            //如果开发Spark Streaming，那么就是他独有的SparkContext
            //以此类推
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 第三步：要针对输入源(hdfs文件，本地文件， hive，等等)，创建一个初始化的RDD
        // 输入源中的数据会打散，分配到RDD的每个partition中，从而形成一个初始的分布式数据集
        // 因为是本地测试，所以针对本地文件
        // SparkContext中，用于根据文件类型的输入创建RDD的方法叫做textFile()方法
        // 在Java中创建的普通RDD，都叫作JavaRDD
        // 在这里，RDD中有元素的概念，如果是hdfs或本地文件，创建的RDD，每一个元素就相当于是文件里的一行
        JavaRDD<String> lines = sc.textFile("C:\\Users\\XBZX\\Desktop\\spark.txt");

        // 第四步：对初始RDD进行transformation操作，也就是计算操作
        // 通常操作会通过创建function，并配合RDD的map，flatMap等算子来执行
        // function如果比较简单，则创建指定function的匿名内部类，如果function比较复杂，则单独创建一个类，作为实现这个function接口的类

        // 先将每一行拆分成单个的单词，
        // FlatMapFunction 有两个泛型参数，分别代表输入和输出类型
        JavaRDD words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        // 接着，需要将每一个单词映射为(单词，1)这种格式，这样才能对每个单词的出现次数进行累加
        // mapToPair 就是将每个元素映射为一个(v1, v2)这样Tuple2类型的元素
        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        // 接着，需要以单词作为key，统计每个单词出现的次数
        // 这里需要reduceByKey这个算子，对每个key对应的value都进行reduce操作
        // 比如JavaPair中有几个元素，分别为(hello, 1),(hello, 1), (word, 1)
        // reduce操作，相当于把第一个值和第二个值进行计算，然后再将结果与第三个值进行计算
        // 最后返回的JavaPairRDD中的元素也是tuple，但是第一个值就是每个key，第二个值是每个key的value
        // reduce之后的结果，相当于就是每个单词出现的次数
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 到这里为止，我们通过spark算子操作，已经统计出了单词的个数，
        // 但是之前使用的flatMap，mapToPair， reduceByKey这种操作都叫做transformation操作
        // 一个spark应用中，光是有transformation操作是不行的，是不会执行的，必须要有一种叫action的操作
        // 最后使用一种叫做action的操作，比如foreach来触发程序的执行
        wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
            }
        });

        sc.close();
    }

}
